package com.digitalchina.dcn.dscc.portal.client;

import com.digitalchina.dcn.dscc.portal.protocol.PortalMessage;
import com.digitalchina.dcn.dscc.portal.server.MessageSendCallback;
import com.digitalchina.dcn.dscc.portal.server.PortalChannelHandler;
import com.digitalchina.dcn.dscc.portal.server.PortalMessageHandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.socket.DatagramChannel;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.protocol.RpcRequest;
import org.apache.spark.network.util.NettyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * Created by Administrator on 2016/8/23 0023.
 */
public class PortalClient implements Closeable {
    private final Logger logger = LoggerFactory.getLogger(PortalClient.class);

    private Channel channel;

    public PortalClient(Channel channel) {
        this.channel = channel;
    }

    public void close() {
        // close is a local operation and should finish with milliseconds; timeout just to be safe
        channel.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
    }

    public void send(final PortalMessage msg, final MessageSendCallback callback) {
        final long startTime = System.currentTimeMillis();
        logger.trace("Sending message {} to AC", msg.type());

        channel.writeAndFlush(msg).addListener(
                new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isSuccess()) {
                            long timeTaken = System.currentTimeMillis() - startTime;
                            logger.trace("Sending request {} to AC took {} ms", msg.type(), timeTaken);
                            callback.onSuccess(msg);
                        } else {
                            String errorMsg = String.format("Failed to send message %s to AC: %s", msg.type(),
                                    future.cause());
                            logger.error(errorMsg, future.cause());
                            channel.close();
                            try {
                                callback.onFailure(msg.userIP(), msg.type(), new IOException(errorMsg, future.cause()));
                            } catch (Exception e) {
                                logger.error("Uncaught exception in message send callback handler!", e);
                            }
                        }
                    }
                });

//        return requestId;
    }
}
